{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "7b19a0cd-31da-45b7-91a4-9cd561f3d3d8",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "# Feathr Fraud Detection Sample\n",
    "\n",
    "This notebook illustrates the use of Feature Store to create a model that predicts the fraud status of transactions based on the user account data and trasaction data. The main focus of this notebook is to depict:\n",
    "* How a feature designer can define heterogenious features from different data sources (user account data and transaction data) with different keys by using Feathr, and\n",
    "* How a feature consumer can extract features using multiple `FeatureQuery`.\n",
    "\n",
    "The sample fraud transaction datasets that are used in the notebook can be found here: https://github.com/microsoft/r-server-fraud-detection.\n",
    "\n",
    "The outline of the notebook is as follows: \n",
    "1. Setup Feathr environment\n",
    "2. Initialize Feathr client \n",
    "3. Define features\n",
    "4. Build features and extract offline features\n",
    "5. Build a fraud detection model\n",
    "6. Materialize features"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Setup Feathr Environment\n",
    "\n",
    "### Deploy Necessary Azure Resources to run Feathr Feature Store\n",
    "\n",
    "Prior to running the notebook, if you have not deployed all the required resources, please refer to the guide here and follow the steps to do so: https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-arm.html\n",
    "\n",
    "### Access to Resources\n",
    "To run the cells below, you need additional permissions for your managed identity to access the keyvault and the Storage Account. You may run the following lines of command in the Cloud Shell in order to grant yourself the access.\n",
    "\n",
    "```\n",
    "userId=<email_id_of_account_requesting_access>\n",
    "resource_prefix=<resource_prefix>\n",
    "synapse_workspace_name=\"${resource_prefix}syws\"\n",
    "keyvault_name=\"${resource_prefix}kv\"\n",
    "objectId=$(az ad user show --id $userId --query id -o tsv)\n",
    "az keyvault update --name $keyvault_name --enable-rbac-authorization false\n",
    "az keyvault set-policy -n $keyvault_name --secret-permissions get list --object-id $objectId\n",
    "az role assignment create --assignee $userId --role \"Storage Blob Data Contributor\"\n",
    "az synapse role assignment create --workspace-name $synapse_workspace_name --role \"Synapse Contributor\" --assignee $userId\n",
    "```\n",
    "\n",
    "### Install Python Packages\n",
    "\n",
    "Uncomment following cell and run it to install Feathr python package and necessary dependencies."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "b9c63dd5-304e-4797-a230-8fb753710dbc",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "# Install feathr from the latest codes in the repo. You may use `pip install feathr[notebook]` as well.\n",
    "# %pip install \"git+https://github.com/feathr-ai/feathr.git#subdirectory=feathr_project&egg=feathr[notebook]\"  "
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Initialize Feathr Client"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "69222adf-1cb0-410b-b98d-e22877f358c0",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "from datetime import datetime, timedelta\n",
    "import os\n",
    "from pathlib import Path\n",
    "\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "\n",
    "import feathr\n",
    "from feathr import (\n",
    "    FeathrClient,\n",
    "    STRING, BOOLEAN, FLOAT, INT32, ValueType,\n",
    "    Feature, DerivedFeature, FeatureAnchor,\n",
    "    BackfillTime, MaterializationSettings,\n",
    "    FeatureQuery, ObservationSettings,\n",
    "    RedisSink,\n",
    "    HdfsSource,\n",
    "    WindowAggTransformation,\n",
    "    TypedKey,\n",
    ")\n",
    "from feathr.datasets.constants import (\n",
    "    FRAUD_DETECTION_ACCOUNT_INFO_URL,\n",
    "    FRAUD_DETECTION_FRAUD_TRANSACTIONS_URL,\n",
    "    FRAUD_DETECTION_UNTAGGED_TRANSACTIONS_URL,\n",
    ")\n",
    "from feathr.datasets.utils import maybe_download\n",
    "from feathr.utils.config import generate_config\n",
    "from feathr.utils.job_utils import get_result_df\n",
    "from feathr.utils.platform import is_databricks\n",
    "\n",
    "\n",
    "print(f\"Feathr version: {feathr.__version__}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "RESOURCE_PREFIX = \"\"  # TODO fill the value used to deploy the resources via ARM template\n",
    "PROJECT_NAME = \"fraud_detection\"\n",
    "\n",
    "# Currently support: 'azure_synapse', 'databricks', and 'local' \n",
    "SPARK_CLUSTER = \"local\"\n",
    "\n",
    "# TODO fill values to use databricks cluster:\n",
    "DATABRICKS_CLUSTER_ID = None             # Set Databricks cluster id to use an existing cluster\n",
    "if is_databricks():\n",
    "    # If this notebook is running on Databricks, its context can be used to retrieve token and instance URL\n",
    "    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()\n",
    "    DATABRICKS_WORKSPACE_TOKEN_VALUE = ctx.apiToken().get()\n",
    "    SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = f\"https://{ctx.tags().get('browserHostName').get()}\"\n",
    "else:\n",
    "    DATABRICKS_WORKSPACE_TOKEN_VALUE = None                  # Set Databricks workspace token to use databricks\n",
    "    SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = None  # Set Databricks workspace url to use databricks\n",
    "\n",
    "# TODO fill values to use Azure Synapse cluster:\n",
    "AZURE_SYNAPSE_SPARK_POOL = None  # Set Azure Synapse Spark pool name\n",
    "AZURE_SYNAPSE_URL = None         # Set Azure Synapse workspace url to use Azure Synapse\n",
    "ADLS_KEY = None                  # Set Azure Data Lake Storage key to use Azure Synapse\n",
    "\n",
    "USE_CLI_AUTH = False  # Set to True to use CLI authentication\n",
    "\n",
    "# An existing Feathr config file path. If None, we'll generate a new config based on the constants in this cell.\n",
    "FEATHR_CONFIG_PATH = None\n",
    "\n",
    "# (For the notebook test pipeline) If true, use ScrapBook package to collect the results.\n",
    "SCRAP_RESULTS = False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if SPARK_CLUSTER == \"azure_synapse\" and not os.environ.get(\"ADLS_KEY\"):\n",
    "    os.environ[\"ADLS_KEY\"] = ADLS_KEY\n",
    "elif SPARK_CLUSTER == \"databricks\" and not os.environ.get(\"DATABRICKS_WORKSPACE_TOKEN_VALUE\"):\n",
    "    os.environ[\"DATABRICKS_WORKSPACE_TOKEN_VALUE\"] = DATABRICKS_WORKSPACE_TOKEN_VALUE"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "a8a70f27-d520-4d3c-bb8c-f364f84cb738",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "# Get an authentication credential to access Azure resources and register features\n",
    "if USE_CLI_AUTH:\n",
    "    # Use AZ CLI interactive browser authentication\n",
    "    !az login --use-device-code\n",
    "    from azure.identity import AzureCliCredential\n",
    "    credential = AzureCliCredential(additionally_allowed_tenants=['*'],)\n",
    "elif \"AZURE_TENANT_ID\" in os.environ and \"AZURE_CLIENT_ID\" in os.environ and \"AZURE_CLIENT_SECRET\" in os.environ:\n",
    "    # Use Environment variable secret\n",
    "    from azure.identity import EnvironmentCredential\n",
    "    credential = EnvironmentCredential()\n",
    "else:\n",
    "    # Try to use the default credential\n",
    "    from azure.identity import DefaultAzureCredential\n",
    "    credential = DefaultAzureCredential(\n",
    "        exclude_interactive_browser_credential=False,\n",
    "        additionally_allowed_tenants=['*'],\n",
    "    )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "# Redis password\n",
    "if 'REDIS_PASSWORD' not in os.environ:\n",
    "    from azure.keyvault.secrets import SecretClient\n",
    "    vault_url = f\"https://{RESOURCE_PREFIX}kv.vault.azure.net\"\n",
    "    secret_client = SecretClient(vault_url=vault_url, credential=credential)\n",
    "    retrieved_secret = secret_client.get_secret('FEATHR-ONLINE-STORE-CONN').value\n",
    "    os.environ['REDIS_PASSWORD'] = retrieved_secret.split(\",\")[1].split(\"password=\", 1)[1]"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Generate a config file"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "50b2f73e-6380-42c3-91e8-4f3e15bc10d6",
     "showTitle": false,
     "title": ""
    },
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "if FEATHR_CONFIG_PATH:\n",
    "    config_path = FEATHR_CONFIG_PATH\n",
    "else:\n",
    "    config_path = generate_config(\n",
    "        resource_prefix=RESOURCE_PREFIX,\n",
    "        project_name=PROJECT_NAME,\n",
    "        spark_config__spark_cluster=SPARK_CLUSTER,\n",
    "        spark_config__azure_synapse__dev_url=AZURE_SYNAPSE_URL,\n",
    "        spark_config__azure_synapse__pool_name=AZURE_SYNAPSE_SPARK_POOL,\n",
    "        spark_config__databricks__workspace_instance_url=SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL,\n",
    "        databricks_cluster_id=DATABRICKS_CLUSTER_ID,\n",
    "    )\n",
    "\n",
    "with open(config_path, 'r') as f: \n",
    "    print(f.read())"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "eab0957c-c906-4297-a729-8dd8d79cb629",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### Initialize Feathr client"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "3734eee3-12f9-44db-a440-ad375ef859f0",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "client = FeathrClient(config_path=config_path, credential=credential)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Define Features\n",
    "\n",
    "### Prepare datasets\n",
    "\n",
    "We prepare the fraud detection dataset as follows:\n",
    "\n",
    "1. Download Account info data, fraud transactions data, and untagged transactions data.\n",
    "2. Tag transaction data based on the fraud transactions data.\n",
    "    1. Aggregate the Fraud table on the account level, creating a start and end datetime. \n",
    "    2. Join this data with the untagged data.\n",
    "    3. Tag the data: `is_fraud = 0` for non fraud, `1` for fraud. \n",
    "3. Upload data files to cloud so that the Feathr's target cluster can consume.\n",
    "\n",
    "![Fraud Detection Workflow Visual](../images/fraud-detection-visual.png)\n",
    "\n",
    "To learn more about the fraud detection scenario as well as the dataset source we use and the method we tag the transactions, please see [here](https://microsoft.github.io/r-server-fraud-detection/data-scientist.html)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Use dbfs if the notebook is running on Databricks\n",
    "if is_databricks():\n",
    "    WORKING_DIR = f\"/dbfs/{PROJECT_NAME}\"\n",
    "else:\n",
    "    WORKING_DIR = PROJECT_NAME"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Download datasets\n",
    "account_info_file_path = f\"{WORKING_DIR}/account_info.csv\"\n",
    "fraud_transactions_file_path = f\"{WORKING_DIR}/fraud_transactions.csv\"\n",
    "obs_transactions_file_path = f\"{WORKING_DIR}/obs_transactions.csv\"\n",
    "maybe_download(\n",
    "    src_url=FRAUD_DETECTION_ACCOUNT_INFO_URL,\n",
    "    dst_filepath=account_info_file_path,\n",
    ")\n",
    "maybe_download(\n",
    "    src_url=FRAUD_DETECTION_FRAUD_TRANSACTIONS_URL,\n",
    "    dst_filepath=fraud_transactions_file_path,\n",
    ")\n",
    "maybe_download(\n",
    "    src_url=FRAUD_DETECTION_UNTAGGED_TRANSACTIONS_URL,\n",
    "    dst_filepath=obs_transactions_file_path,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Load datasets\n",
    "fraud_df = pd.read_csv(fraud_transactions_file_path)\n",
    "obs_df = pd.read_csv(obs_transactions_file_path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Combine transactionDate and transactionTime into one column. E.g. \"20130903\", \"013641\" -> \"20130903 013641\"\n",
    "fraud_df[\"timestamp\"] = fraud_df[\"transactionDate\"].astype(str) + \" \" + fraud_df[\"transactionTime\"].astype(str).str.zfill(6)\n",
    "obs_df[\"timestamp\"] = obs_df[\"transactionDate\"].astype(str) + \" \" + obs_df[\"transactionTime\"].astype(str).str.zfill(6)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this step, we compute the timestamp range that the frauds were happened by referencing the transaction-level fraud data.\n",
    "We create the labels `is_fraud` to the untagged transaction data based on that."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# For each user in the fraud transaction data, get the timestamp range that the fraud transactions were happened. \n",
    "fraud_labels_df = fraud_df.groupby(\"accountID\").agg({\"timestamp\": ['min', 'max']})\n",
    "fraud_labels_df.columns = [\"_\".join(col) for col in fraud_labels_df.columns.values]\n",
    "fraud_labels_df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Combine fraud and untagged transaction data to generate the tagged transaction data.\n",
    "transactions_df = pd.concat([fraud_df, obs_df], ignore_index=True).merge(\n",
    "    fraud_labels_df,\n",
    "    on=\"accountID\",\n",
    "    how=\"outer\",\n",
    ")\n",
    "\n",
    "# Data cleaning\n",
    "transactions_df.dropna(\n",
    "    subset=[\n",
    "        \"accountID\",\n",
    "        \"transactionID\",\n",
    "        \"transactionAmount\",\n",
    "        \"localHour\",\n",
    "        \"timestamp\",\n",
    "    ],\n",
    "    inplace=True,\n",
    ")\n",
    "transactions_df.sort_values(\"timestamp\", inplace=True)\n",
    "transactions_df.drop_duplicates(inplace=True)\n",
    "\n",
    "# is_fraud = 0 if the transaction is not fraud. Otherwise (if it is a fraud), is_fraud = 1.\n",
    "transactions_df[\"is_fraud\"] = np.logical_and(\n",
    "    transactions_df[\"timestamp_min\"] <= transactions_df[\"timestamp\"],\n",
    "    transactions_df[\"timestamp\"] <= transactions_df[\"timestamp_max\"],\n",
    ").astype(int)\n",
    "\n",
    "transactions_df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "transactions_df[\"is_fraud\"].value_counts()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Save the tagged transaction data into file\n",
    "transactions_file_path = f\"{WORKING_DIR}/transactions.csv\"\n",
    "transactions_df.to_csv(transactions_file_path, index=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Upload files to cloud if needed\n",
    "if client.spark_runtime == \"local\":\n",
    "    # In local mode, we can use the same data path as the source.\n",
    "    # If the notebook is running on databricks, DATA_FILE_PATH should be already a dbfs path.\n",
    "    account_info_source_path = account_info_file_path\n",
    "    transactions_source_path = transactions_file_path\n",
    "elif client.spark_runtime == \"databricks\" and is_databricks():\n",
    "    # If the notebook is running on databricks, we can use the same data path as the source.\n",
    "    account_info_source_path = account_info_file_path.replace(\"/dbfs\", \"dbfs:\")\n",
    "    transactions_source_path = transactions_file_path.replace(\"/dbfs\", \"dbfs:\")\n",
    "else:\n",
    "    # Otherwise, upload the local file to the cloud storage (either dbfs or adls).\n",
    "    account_info_source_path = client.feathr_spark_launcher.upload_or_get_cloud_path(account_info_file_path)\n",
    "    transactions_source_path = client.feathr_spark_launcher.upload_or_get_cloud_path(transactions_file_path)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "f6adbca1-5642-4ac1-bff7-e7c9d4d9e5b2",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "Now, we can define following features:\n",
    "- Account features: Account-level features that will be joined to observation data on accountID\n",
    "- Transaction features: The features that will be joined to observation data on transactionID\n",
    "- Transaction aggregated features: The features aggregated by accountID\n",
    "- Derived features: The features derived from other features\n",
    "\n",
    "Some important concepts include `HdfsSource`, `TypedKey`, `Feature`, `FeatureAnchor`, and `DerivedFeature`. Please refer to feathr [documents](https://feathr.readthedocs.io/en/latest/feathr.html) to learn more about the details.\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "b073b509-0f95-4e23-b16b-ffd8190fb6a2",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### Define account features\n",
    "\n",
    "Let's first check the account data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Check account data\n",
    "pd.read_csv(account_info_file_path).head()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here, we use `accountCountry`, `isUserRegistered`, `numPaymentRejects1dPerUser`, and `accountAge` as the account features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def account_preprocessing(df):\n",
    "    \"\"\"Drop rows with missing values in the account info dataset.\"\"\"\n",
    "    return df.select(\n",
    "        \"accountID\",\n",
    "        \"accountCountry\",\n",
    "        \"isUserRegistered\",\n",
    "        \"numPaymentRejects1dPerUser\",\n",
    "        \"accountAge\",\n",
    "    ).dropna(subset=[\"accountID\"])\n",
    "\n",
    "\n",
    "account_info_source = HdfsSource(\n",
    "    name=\"account_data\",\n",
    "    path=account_info_source_path,\n",
    "    preprocessing=account_preprocessing,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "b3668eeb-e4a0-4327-baf6-5521c856f51d",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "# Account features will be joined to observation data on accountID\n",
    "account_id = TypedKey(\n",
    "    key_column=\"accountID\",\n",
    "    key_column_type=ValueType.STRING,\n",
    "    description=\"account id\",\n",
    ")\n",
    "\n",
    "account_features = [\n",
    "    Feature(\n",
    "        name=\"account_country_code\",\n",
    "        key=account_id,\n",
    "        feature_type=STRING, \n",
    "        transform=\"accountCountry\",\n",
    "    ),\n",
    "    Feature(\n",
    "        name=\"is_user_registered\",\n",
    "        key=account_id,\n",
    "        feature_type=BOOLEAN,\n",
    "        transform=\"isUserRegistered==TRUE\",\n",
    "    ),\n",
    "    Feature(\n",
    "        name=\"num_payment_rejects_1d_per_user\",\n",
    "        key=account_id,\n",
    "        feature_type=INT32,\n",
    "        transform=\"numPaymentRejects1dPerUser\",\n",
    "    ),\n",
    "    Feature(\n",
    "        name=\"account_age\",\n",
    "        key=account_id,\n",
    "        feature_type=INT32,\n",
    "        transform=\"accountAge\",\n",
    "    ),\n",
    "]\n",
    "\n",
    "account_anchor = FeatureAnchor(\n",
    "    name=\"account_features\",\n",
    "    source=account_info_source,\n",
    "    features=account_features,\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "6f12c07e-4faf-4411-8acd-6f5d13b962f8",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### Define transaction features\n",
    "\n",
    "We already checked the transaction dataset when we tagged the fraud label `is_fraud`. So, let's jump to defining features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def transaction_preprocessing(df):\n",
    "    \"\"\"Preprocess the transaction data.\"\"\"\n",
    "    import pyspark.sql.functions as F\n",
    "\n",
    "    return df.withColumn(\"ipCountryCode\", F.upper(\"ipCountryCode\"))\n",
    "\n",
    "\n",
    "transactions_source = HdfsSource(\n",
    "    name=\"transaction_data\",\n",
    "    path=transactions_source_path,\n",
    "    event_timestamp_column=\"timestamp\",\n",
    "    timestamp_format=\"yyyyMMdd HHmmss\",\n",
    "    preprocessing=transaction_preprocessing,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "280062b9-ae21-4a1a-ae94-86a5c17fd589",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "# Transaction features will be joined to observation data on transactionID\n",
    "transaction_id = TypedKey(\n",
    "    key_column=\"transactionID\",\n",
    "    key_column_type=ValueType.STRING,\n",
    "    description=\"transaction id\",\n",
    ")\n",
    "\n",
    "transaction_amount = Feature(\n",
    "    name=\"transaction_amount\",\n",
    "    key=transaction_id,\n",
    "    feature_type=FLOAT,\n",
    "    transform=\"transactionAmount\",\n",
    ")\n",
    "\n",
    "transaction_features = [\n",
    "    transaction_amount,\n",
    "    Feature(\n",
    "        name=\"transaction_country_code\",\n",
    "        key=transaction_id,\n",
    "        feature_type=STRING,\n",
    "        transform=\"ipCountryCode\",\n",
    "    ),\n",
    "    Feature(\n",
    "        name=\"transaction_time\",\n",
    "        key=transaction_id,\n",
    "        feature_type=FLOAT,\n",
    "        transform=\"localHour\",  # Local time of the transaction\n",
    "    ),\n",
    "    Feature(\n",
    "        name=\"is_proxy_ip\",\n",
    "        key=transaction_id,\n",
    "        feature_type=STRING,  # [nan, True, False]\n",
    "        transform=\"isProxyIP\",\n",
    "    ),\n",
    "    Feature(\n",
    "        name=\"cvv_verify_result\",\n",
    "        key=transaction_id,\n",
    "        feature_type=STRING,  # [nan, 'M', 'P', 'N', 'X', 'U', 'S', 'Y']\n",
    "        transform=\"cvvVerifyResult\",\n",
    "    ),\n",
    "]\n",
    "\n",
    "transaction_feature_anchor = FeatureAnchor(\n",
    "    name=\"transaction_features\",\n",
    "    source=transactions_source,\n",
    "    features=transaction_features,\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "86ac05e1-26bb-4820-87ea-f547e3561181",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### Define transaction aggregated-features"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "4c969554-f690-42f5-b70a-d962bf558b03",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "# average amount of transaction in that week\n",
    "avg_transaction_amount = Feature(\n",
    "    name=\"avg_transaction_amount\",\n",
    "    key=account_id,\n",
    "    feature_type=FLOAT,\n",
    "    transform=WindowAggTransformation(\n",
    "        agg_expr=\"cast_float(transactionAmount)\", agg_func=\"AVG\", window=\"7d\"\n",
    "    ),\n",
    ")\n",
    "\n",
    "agg_features = [\n",
    "    avg_transaction_amount,\n",
    "    # number of transaction that took place in a day\n",
    "    Feature(\n",
    "        name=\"num_transaction_count_in_day\",\n",
    "        key=account_id,\n",
    "        feature_type=INT32,\n",
    "        transform=WindowAggTransformation(\n",
    "            agg_expr=\"transactionID\", agg_func=\"COUNT\", window=\"1d\"\n",
    "        ),\n",
    "    ),\n",
    "    # number of transaction that took place in the past week\n",
    "    Feature(\n",
    "        name=\"num_transaction_count_in_week\",\n",
    "        key=account_id,\n",
    "        feature_type=INT32,\n",
    "        transform=WindowAggTransformation(\n",
    "            agg_expr=\"transactionID\", agg_func=\"COUNT\", window=\"7d\"\n",
    "        ),\n",
    "    ),\n",
    "    # amount of transaction that took place in a day\n",
    "    Feature(\n",
    "        name=\"total_transaction_amount_in_day\",\n",
    "        key=account_id,\n",
    "        feature_type=FLOAT,\n",
    "        transform=WindowAggTransformation(\n",
    "            agg_expr=\"cast_float(transactionAmount)\", agg_func=\"SUM\", window=\"1d\"\n",
    "        ),\n",
    "    ),\n",
    "    # average time of transaction in the past week\n",
    "    Feature(\n",
    "        name=\"avg_transaction_time_in_week\",\n",
    "        key=account_id,\n",
    "        feature_type=FLOAT,\n",
    "        transform=WindowAggTransformation(\n",
    "            agg_expr=\"cast_float(localHour)\", agg_func=\"AVG\", window=\"7d\"\n",
    "        ),\n",
    "    ),\n",
    "]\n",
    "\n",
    "agg_anchor = FeatureAnchor(\n",
    "    name=\"transaction_agg_features\",\n",
    "    source=transactions_source,\n",
    "    features=agg_features,\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "17cc5132-461f-4d3d-b517-1f7e69d23252",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### Define derived features"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "7ac10ce4-e222-469c-bb2e-1658b45e3eda",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "derived_features = [\n",
    "    DerivedFeature(\n",
    "        name=\"diff_between_current_and_avg_amount\",\n",
    "        key=[transaction_id, account_id],\n",
    "        feature_type=FLOAT,\n",
    "        input_features=[transaction_amount, avg_transaction_amount],\n",
    "        transform=\"transaction_amount - avg_transaction_amount\",\n",
    "    ),\n",
    "]"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "a9ec8416-9ac6-4499-b60f-55822265b893",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 4. Build Features and Extract Offline Features\n",
    "\n",
    "Now, let's build the features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "d9d32d4f-2b60-4978-bb87-c7d2160e98eb",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "client.build_features(\n",
    "    anchor_list=[\n",
    "        account_anchor,\n",
    "        transaction_feature_anchor,\n",
    "        agg_anchor,\n",
    "    ],\n",
    "    derived_feature_list=derived_features,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "account_feature_names = [feat.name for feat in account_features] + [feat.name for feat in agg_features]\n",
    "transactions_feature_names = [feat.name for feat in transaction_features]\n",
    "derived_feature_names = [feat.name for feat in derived_features]"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To extract the offline feature values from the features that have different keys, we use multiple `FeatureQuery` objects."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "b6340f2f-79dc-442b-a202-b2f2078a62ac",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "account_feature_query = FeatureQuery(\n",
    "    feature_list=account_feature_names,\n",
    "    key=account_id,\n",
    ")\n",
    "\n",
    "transactions_feature_query = FeatureQuery(\n",
    "    feature_list=transactions_feature_names,\n",
    "    key=transaction_id,\n",
    ")\n",
    "\n",
    "derived_feature_query = FeatureQuery(\n",
    "    feature_list=derived_feature_names,\n",
    "    key=[transaction_id, account_id],\n",
    ")\n",
    "                   \n",
    "settings = ObservationSettings(\n",
    "    observation_path=transactions_source_path,\n",
    "    event_timestamp_column=\"timestamp\",\n",
    "    timestamp_format=\"yyyyMMdd HHmmss\",\n",
    ")\n",
    "    \n",
    "client.get_offline_features(\n",
    "    observation_settings=settings,\n",
    "    feature_query=[account_feature_query, transactions_feature_query, derived_feature_query],\n",
    "    output_path=transactions_source_path.rpartition(\"/\")[0] + f\"/fraud_transactions_features.avro\",\n",
    ")\n",
    "\n",
    "client.wait_job_to_finish(timeout_sec=5000)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = get_result_df(client)[\n",
    "    account_feature_names\n",
    "    + transactions_feature_names\n",
    "    + derived_feature_names\n",
    "    + [\"is_fraud\", \"timestamp\"]\n",
    "]\n",
    "\n",
    "df.head(5)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5. Build a Fraud Detection Model\n",
    "\n",
    "We use [Random Forest Classifier](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html) to build a fraud detection model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from plotly.subplots import make_subplots\n",
    "import plotly.graph_objects as go\n",
    "import plotly.express as px\n",
    "from sklearn.ensemble import RandomForestClassifier\n",
    "from sklearn.model_selection import train_test_split\n",
    "from sklearn.metrics import (\n",
    "    confusion_matrix,\n",
    "    f1_score,\n",
    "    precision_score,\n",
    "    recall_score,\n",
    "    PrecisionRecallDisplay,\n",
    ")\n",
    "from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Understand the dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.describe().T"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.nunique()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# plot only sub-samples for simplicity\n",
    "NUM_SAMPLES_TO_PLOT = 5000\n",
    "\n",
    "fig = px.scatter_matrix(\n",
    "    df.sample(n=NUM_SAMPLES_TO_PLOT, random_state=42),\n",
    "    dimensions=df.columns[:-2],  # exclude the label and timestamp\n",
    "    color=\"is_fraud\",\n",
    "    labels={col:col.replace('_', ' ') for col in df.columns}, # remove underscore\n",
    ")\n",
    "fig.update_traces(diagonal_visible=False, showupperhalf=False, marker_size=3, marker_opacity=0.5)\n",
    "fig.update_layout(\n",
    "    width=2000,\n",
    "    height=2000,\n",
    "    title={\"text\": \"Scatter matrix for transaction dataset\", \"font_size\": 20},\n",
    "    font_size=6,\n",
    ")\n",
    "fig.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Split training and validation sets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "n_train = int(len(df) * 0.7)\n",
    "\n",
    "train_df = df.iloc[:n_train]\n",
    "test_df = df.iloc[n_train:]\n",
    "\n",
    "print(f\"\"\"Training set:\n",
    "{train_df[\"is_fraud\"].value_counts()}\n",
    "\n",
    "Validation set:\n",
    "{test_df[\"is_fraud\"].value_counts()}\n",
    "\"\"\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Check the time range of the training and test set doesn't overlap\n",
    "train_df[\"timestamp\"].max(), test_df[\"timestamp\"].min()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Train and test a machine learning model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Get labels as integers\n",
    "y_train = train_df[\"is_fraud\"].astype(int).to_numpy()\n",
    "y_test = test_df[\"is_fraud\"].astype(int).to_numpy()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# We convert categorical features into integer values by using one-hot-encoding and ordinal-encoding\n",
    "categorical_feature_names = [\n",
    "    \"account_country_code\",\n",
    "    \"transaction_country_code\",\n",
    "    \"cvv_verify_result\",\n",
    "]\n",
    "ordinal_feature_names = [\n",
    "    \"is_user_registered\",\n",
    "    \"is_proxy_ip\",\n",
    "]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "one_hot_encoder = OneHotEncoder(sparse_output=False).fit(df[categorical_feature_names])\n",
    "ordinal_encoder = OrdinalEncoder().fit(df[ordinal_feature_names])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ordinal_encoder.categories_"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "one_hot_encoder.categories_"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X_train = np.concatenate(\n",
    "    (\n",
    "        one_hot_encoder.transform(train_df[categorical_feature_names]),\n",
    "        ordinal_encoder.transform(train_df[ordinal_feature_names]),\n",
    "        train_df.drop(categorical_feature_names + ordinal_feature_names + [\"is_fraud\", \"timestamp\"], axis=\"columns\").fillna(0).to_numpy(),\n",
    "    ),\n",
    "    axis=1,\n",
    ")\n",
    "\n",
    "X_test = np.concatenate(\n",
    "    (\n",
    "        one_hot_encoder.transform(test_df[categorical_feature_names]),\n",
    "        ordinal_encoder.transform(test_df[ordinal_feature_names]),\n",
    "        test_df.drop(categorical_feature_names + ordinal_feature_names + [\"is_fraud\", \"timestamp\"], axis=\"columns\").fillna(0).to_numpy(),\n",
    "    ),\n",
    "    axis=1,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "clf = RandomForestClassifier(\n",
    "    n_estimators=50,\n",
    "    random_state=42,\n",
    ").fit(X_train, y_train)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "clf.score(X_test, y_test)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "y_pred = clf.predict(X_test)\n",
    "y_pred"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "y_prob = clf.predict_proba(X_test)\n",
    "y_prob"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To measure the performance, we use recall, precision and F1 score that handle imbalanced data better."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display = PrecisionRecallDisplay.from_predictions(\n",
    "    y_test, y_prob[:, 1], name=\"RandomForestClassifier\"\n",
    ")\n",
    "_ = display.ax_.set_title(\"Fraud Detection Precision-Recall Curve\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "precision = precision_score(y_test, y_pred)\n",
    "recall = recall_score(y_test, y_pred)\n",
    "f1 = f1_score(y_test, y_pred)\n",
    "\n",
    "print(f\"\"\"Precision: {precision},\n",
    "Recall: {recall},\n",
    "F1: {f1}\"\"\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "confusion_matrix(y_test, y_pred)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Feature importance"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "numeric_feature_names = [name for name in train_df.columns if name not in set(categorical_feature_names + ordinal_feature_names + [\"is_fraud\", \"timestamp\"])]\n",
    "numeric_feature_names"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# the order of features is [categorical features, ordinal features, numeric features]\n",
    "importances = clf.feature_importances_[-len(numeric_feature_names):]\n",
    "std = np.std([tree.feature_importances_[-len(numeric_feature_names):] for tree in clf.estimators_], axis=0)\n",
    "\n",
    "fig = px.bar(\n",
    "    pd.DataFrame([numeric_feature_names, importances, std], index=[\"Numeric features\", \"importances\", \"std\"]).T,\n",
    "    y=\"Numeric features\",\n",
    "    x=\"importances\",\n",
    "    error_x=\"std\",\n",
    "    orientation=\"h\",\n",
    "    title=\"Importance of the numeric features\",\n",
    ")\n",
    "fig.update_layout(showlegend=False, width=1000)\n",
    "fig.update_xaxes(title_text=\"Mean decrease in impurity\", range=[0, 0.5])\n",
    "fig.update_yaxes(title_text=\"Numeric features\")\n",
    "fig.show()\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "feature_names = categorical_feature_names + ordinal_feature_names\n",
    "categories = one_hot_encoder.categories_ + ordinal_encoder.categories_\n",
    "\n",
    "start_i = 0\n",
    "n_rows = len(feature_names)\n",
    "\n",
    "fig = make_subplots(\n",
    "    rows=n_rows,\n",
    "    cols=1,\n",
    "    subplot_titles=[name.replace(\"_\", \" \") for name in feature_names],\n",
    "    x_title=\"Mean decrease in impurity\",\n",
    ")\n",
    "\n",
    "for i in range(n_rows):\n",
    "    category = categories[i]\n",
    "    end_i = start_i + len(category)\n",
    "\n",
    "    fig.add_trace(\n",
    "        go.Bar(\n",
    "            x=clf.feature_importances_[start_i:end_i],\n",
    "            y=category,\n",
    "            width=0.2,\n",
    "            error_x=dict(\n",
    "                type=\"data\",\n",
    "                array=np.std([tree.feature_importances_[start_i:end_i] for tree in clf.estimators_], axis=0),\n",
    "            ),\n",
    "            orientation=\"h\",\n",
    "        ),\n",
    "        row=i+1,\n",
    "        col=1,\n",
    "    )\n",
    "\n",
    "    start_i = end_i\n",
    "    \n",
    "fig.update_layout(title=\"Importance of the categorical features\", showlegend=False, width=1000, height=1000)\n",
    "fig.update_xaxes(range=[0, 0.5])\n",
    "fig.show()\n",
    "\n",
    "   "
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "83e69f23-aa4e-4893-8907-6d5f0792c23f",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## Materialize Features in Redis\n",
    "\n",
    "Now, we materialize features to `RedisSink` so that we can retrieve online features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "faad23c1-d827-4674-b630-83530574c27d",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "ACCOUNT_FEATURE_TABLE_NAME = \"fraudDetectionAccountFeatures\" \n",
    "\n",
    "backfill_time = BackfillTime(\n",
    "    start=datetime(2013, 8, 4),\n",
    "    end=datetime(2013, 8, 4),\n",
    "    step=timedelta(days=1),\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client.materialize_features(\n",
    "    MaterializationSettings(\n",
    "        ACCOUNT_FEATURE_TABLE_NAME,\n",
    "        backfill_time=backfill_time,\n",
    "        sinks=[RedisSink(table_name=ACCOUNT_FEATURE_TABLE_NAME)],\n",
    "        feature_names=account_feature_names[1:],\n",
    "    ),\n",
    "    allow_materialize_non_agg_feature=True,\n",
    ")\n",
    "\n",
    "client.wait_job_to_finish(timeout_sec=5000)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "1f5b191f-b1e8-49e4-b54d-ffc2f8c0a0b8",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "materialized_feature_values = client.get_online_features(\n",
    "    ACCOUNT_FEATURE_TABLE_NAME,\n",
    "    key=\"A1055520452832600\",\n",
    "    feature_names=account_feature_names[1:],\n",
    ")\n",
    "materialized_feature_values"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Scrap results for unit test"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if SCRAP_RESULTS:\n",
    "    import scrapbook as sb\n",
    "    sb.glue(\"materialized_feature_values\", materialized_feature_values)\n",
    "    sb.glue(\"precision\", precision)\n",
    "    sb.glue(\"recall\", recall)\n",
    "    sb.glue(\"f1\", f1)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Cleanup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Cleaning up the output files. CAUTION: this maybe dangerous if you \"reused\" the project name.\n",
    "import shutil\n",
    "shutil.rmtree(WORKING_DIR, ignore_errors=False)"
   ]
  }
 ],
 "metadata": {
  "application/vnd.databricks.v1+notebook": {
   "dashboards": [],
   "language": "python",
   "notebookMetadata": {
    "pythonIndentUnit": 4
   },
   "notebookName": "fraud_detection_feathr_test_2",
   "notebookOrigID": 1891349682974490,
   "widgets": {}
  },
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.15 (default, Nov 24 2022, 15:19:38) \n[GCC 11.2.0]"
  },
  "vscode": {
   "interpreter": {
    "hash": "e34a1a57d2e174682770a82d94a178aa36d3ccfaa21227c5d2308e319b7ae532"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
